/**
 * @project: parallel-task
 * @package: com.ngplat.paralleltask.task
 * @filename: TaskPoolManager.java
 *
 * Copyright (c) 2018 eSunny Info. Tech Ltd. All rights reserved.
 * 
 */
package com.ngplat.paralleltask.task;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import com.ngplat.paralleltask.common.Cleanable;
import com.ngplat.paralleltask.constants.TaskState;
import com.ngplat.paralleltask.exception.TaskExecutionException;
import com.ngplat.paralleltask.model.JobInfo;
import com.ngplat.paralleltask.model.TaskInfo;
import com.ngplat.paralleltask.task.config.CleanableThreadPoolExecutor;
import com.ngplat.paralleltask.task.config.DefaultThreadPoolConfig;
import com.ngplat.paralleltask.task.config.ThreadPoolConfig;
import com.ngplat.paralleltask.utils.Reflections;
import com.ngplat.paralleltask.utils.ThreadUtils;

/**
 * @typename: TaskPoolManager
 * @brief: 任务管理器
 * @author: KI ZCQ
 * @date: 2018年4月28日 上午10:37:53
 * @version: 1.0.0
 * @since
 * 
 */
public class TaskPoolManager implements Cleanable {

	private final static Logger logger = LoggerFactory.getLogger(TaskPoolManager.class);

	// 线程池默认配置
	private volatile ThreadPoolConfig config = new DefaultThreadPoolConfig();

	// 默认阻塞队列
	private volatile BlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(config.getMaxCacheTaskNum());

	// 作业管理池
	private Map<String, JobInfo> jobPool = new ConcurrentHashMap<>();

	/**
	 * 启动时, 线程池使用默认配置 如果任务池已开始执行, 则不可修改配置
	 */
	private static AtomicBoolean isStarted = new AtomicBoolean(false);

	/**
	 * 可执行任务数
	 */
	private AtomicInteger taskCount = new AtomicInteger(0);

	/**
	 * 任务执行单元
	 */
	private WorkUnit workUnit;

	// 任务执行线程池
	private volatile ThreadPoolExecutor threadPool = new CleanableThreadPoolExecutor(config.getCoreTaskNum(),
			config.getMaxTaskNum(), 10, TimeUnit.SECONDS, taskQueue, new CustomizableThreadFactory("parallel-task-pool-"),
			new ThreadPoolExecutor.CallerRunsPolicy());

	// singleton instance
	public static TaskPoolManager DEFAULT = new TaskPoolManager();

	/**
	 * create a new instance TaskManager.
	 */
	private TaskPoolManager() {
		// 打印默认线程池配置信息
		logThreadPoolInfo();
	}

	/**
	 * @Description: 重新加载线程池配置
	 * @param tpConfig
	 *            自定义线程池配置信息
	 */
	public void refreshConfig(ThreadPoolConfig tpConfig) {
			threadPool.shutdownNow();
			config = tpConfig;
			taskQueue = new ArrayBlockingQueue<Runnable>(config.getMaxCacheTaskNum());
			threadPool = new CleanableThreadPoolExecutor(config.getCoreTaskNum(), config.getMaxTaskNum(), 10, TimeUnit.SECONDS,
					taskQueue, new CustomizableThreadFactory("multi-task-pool-"),
					new ThreadPoolExecutor.CallerRunsPolicy());
			logThreadPoolInfo();
	}

	/**
	 * @Description: 打印当前线程池配置信息
	 */
	public void logThreadPoolInfo() {
		String logFormat = "[*** CORE_TASK_NUM:%s MAX_TASK_NUM:%s MAX_CACHE_TASK_NUM:%s "
				+ "TASK_TIMEOUT_MILL_SECONDS:%s AVAILABLE_PROCESSORs:%s ***]";
		logger.info(String.format(logFormat, config.getCoreTaskNum(), config.getMaxTaskNum(), config.getMaxCacheTaskNum(),
				config.getTaskTimeoutMillSeconds(), Runtime.getRuntime().availableProcessors()));
	}

	/**
	 * @Description: 批量添加任务信息, 生成可执行任务
	 * @param taskList 任务列表
	 */
	public synchronized void addTasks(List<TaskInfo> taskList) {
		if (taskList == null || taskList.size() == 0) {
			throw new NullPointerException("任务信息列表为空, 请检查.");
		}

		for (TaskInfo task : taskList) {
			// processor
			String beanClass = task.getBeanClass();

			if (StringUtils.isBlank(beanClass)) {
				throw new RuntimeException("任务处理器对应Bean Class is null, 请检查." + task.toString());
			}
			// 反射生成对象
			Object obj;
			try {
				obj = Reflections.createObject(beanClass);
			} catch (Exception e) {
				logger.error("通过反射生成TaskProcessor对象失败, 请检查, e: {}", e);
				throw Reflections.convertReflectionExceptionToUnchecked(e);
			}

			if (obj instanceof TaskProcessor) {
				this.addTask(task, (TaskProcessor) obj);
			} else {
				throw new RuntimeException(
						String.format("通过BeanClass{}生成的任务处理器非TaskProcessor的实例, 请检查." + task.toString(), beanClass));
			}
		}
	}
	
	/**
	 * @Description: 开始执行任务
	 * @return
	 */
	public synchronized void start() {
		// 线程池执行过程中不允许初始化
		if (isStarted.compareAndSet(false, true)) {
			// 创建一个任务执行单元
			workUnit = this.newWorkUnit();
			// 开始执行每组作业的root任务
			if (jobPool == null || jobPool.size() == 0) {
				throw new TaskExecutionException("无可执行的任务, 请检查.");
			}

			// 提交任务执行
			for (JobInfo job : jobPool.values()) {
				TaskPool taskPool = job.getTaskPool();
				if (taskPool == null || taskPool.getRootWorkTask() == null) {
					throw new TaskExecutionException("作业" + job.getJobId() + "的任务节点为空, 请检查.");
				}
				// 开始执行任务
				workUnit.submit(taskPool.getRootWorkTask());
			}

			// 等待任务完成(阻塞模式)
			workUnit.waitForCompletion();
		} else {
			throw new TaskExecutionException("任务正在执行, 请先强行中断后, 再次执行.");
		}
	}

	/**
	 * @Description: 具备可执行条件的子任务开始提交执行
	 * @param task
	 */
	public synchronized void submit(WorkerTask task) {
		if (task == null || task.getProcessor() == null) {
			throw new TaskExecutionException("任务为null 或者 任务处理器为null, 请检查." + task.toString());
		}
		// 打印当前任务信息
		this.printQueueSize();
		// 任务队列满了, 则等待
        this.checkQueueFullThenSleep();

		// 是否可以执行, 不满足条件, 不提交线程池执行
		if (task.isExecuted()) {
			workUnit.submit(task);
		}
		// 以下两个分支可以合并, 但是为了打印日志, 这里选择不合并
		else if (task.isFinished()) {
			logger.info("该任务状态为已完成, 不再重复执行, 通知所有子任务, TaskInfo: {}", task.toString());
			// 任务完成, 发广播
			workUnit.workFinished(task, TaskState.TASK_COMPLETE);
		} else if (task.isDisabled()) {
			logger.info("该任务状态为D, 不可执行, 通知所有子任务, TaskInfo: {}", task.toString());
			// 该任务不执行, 但是要通知依赖
			workUnit.workFinished(task, TaskState.TASK_COMPLETE);
		} else {
			throw new TaskExecutionException("任务可执行条件未达到, 不可执行." + task.toString());
		}
	}

	/**
	 * @Description: 构建一组工作单元，为并行执行的原子单位，均运行完成（成功或失败）后返回
	 * @return 一组工作单元
	 */
	private WorkUnit newWorkUnit() {
		// 可执行任务数
		int taskSize = taskCount.get();
		// 作业数(每个作业有一个根任务)
		int poolSize = jobPool.size();
		logger.info(String.format("TaskManager - current task size is:[%d], JobPool size is:[%d]", taskSize, poolSize));
		// 总任务数
		int totalTaskCount = taskSize + poolSize;
		return new WorkUnit(totalTaskCount, threadPool);
	}

	/**
	 * @Description: 添加任务
	 * @param task
	 * @param service
	 */
	public synchronized void addTask(TaskInfo task, TaskProcessor processor) {

		if (task == null || processor == null || task.getTaskId() == null) {
			throw new NullPointerException("task is null or processor is null, info: " + task.toString());
		}

		// 组Id
		String jobId = task.getJobId();
		if (StringUtils.isBlank(jobId)) {
			jobId = JobInfo.DEFAULT_GROUP_ID;
		}

		// 新建job
		if (!jobPool.containsKey(jobId)) {
			jobPool.put(jobId, new JobInfo(jobId));
		}
		// 作业信息(组概念)
		JobInfo jobInfo = jobPool.get(jobId);
		// 添加任务
		jobInfo.getTaskPool().addTask(task, processor);
		// 任务计数器
		taskCount.incrementAndGet();
	}

	/**
	 * @Description: 对子任务发送任务完成通知
	 * @param task
	 * @param taskComplete
	 */
	public void fireEvent(TaskInfo taskInfo, TaskState state) {
		// 组Id
		String jobId = taskInfo.getJobId();
		// 所属组
		JobInfo jobInfo = jobPool.get(jobId);
		// 发通知
		jobInfo.getTaskPool().fireEvent(taskInfo.getTaskId(), state);
	}
	
	/**
	 * @Description: 任务队列满了, 则等待
	 */
	private void checkQueueFullThenSleep() {
        // 避免queue满后抛异常
        while (taskQueue.size() >= config.getMaxCacheTaskNum()) {
            logger.info("TaskPoolManager thread pool blocking queue is full.");
            sleep(config.getQueueFullSleepTime());
        }
    }

	/**
	 * @Description: 打印当前任务队列大小
	 */
	private void printQueueSize() {
		if (taskQueue.size() > 1) {
            logger.debug("TaskPoolManager current taskQueue size is:" + taskQueue.size());
        }
	}
	
    /**
     * @Description: 封装异常
     * @param miliSeconds
     */
    private void sleep(int miliSeconds) {
        try {
            Thread.sleep(miliSeconds);
        } catch (InterruptedException e) {
            logger.error("Thread sleep is interrupted", e);
        }
    }
    
    /**
     * @Description: 启动可并行Task
     * @param task
     * @return Task执行的Future
     */
    public <T> Future<T> invoke(Callable<T> task) {
        this.printQueueSize();
        this.checkQueueFullThenSleep();
        Future<T> result = threadPool.submit(task);
        return result;
    }

    /**
     * @Description: 启动可并行Task,不关心结果
     * @param task
     */
    public void invoke(Runnable task) {
        this.printQueueSize();
        this.checkQueueFullThenSleep();
        threadPool.execute(task);
    }
    
	/**
	 * config
	 *
	 * @return the config
	 * @since 1.0.0
	 */
	public ThreadPoolConfig getConfig() {
		return config;
	}

	/**
	 * @param config
	 *            the config to set
	 */
	public void setConfig(ThreadPoolConfig config) {
		this.config = config;
	}

	/**
	 * @see com.ngplat.paralleltask.common.Cleanable#cleanResource()
	 */
	@Override
	public void cleanResource() {
		logger.info("[TaskManager:cleanResource]执行cleanResource, 优雅关闭线程池....");
		
		ThreadUtils.gracefulShutdown(threadPool, 0, 0, TimeUnit.MILLISECONDS);
		
		// threadPool.shutdownNow();
		threadPool = null;
		workUnit = null;
		// 重置任务状态
		for (JobInfo job : jobPool.values()) {
			job.getTaskPool().cleanResource();
		}
		// 初始化标志位
		isStarted = new AtomicBoolean(false);
	}

}
